<?php

/**
 * DB队列, 高并发下不保证入库顺序
 * Class DBQueue
 */
class DBQueue extends Model
{
    const table_message = 'queue_message';
    const table_consumer = 'queue_consumer';
    const table_handle = 'queue_handle';

    //消息状态: queue_message.status
    const status_error = -1;
    const status_init = 0;
    const status_notify_start = 1; //开始分发
    const status_notify_end = 2; //分发完毕

    //消费者状态: queue_consumer.status
    const consumer_status_yes = 0; //监听中
    const consumer_status_no = 1; //不再监听

    //消息处理状态
    const handle_status_error = -1; //消息处理失败
    const handle_status_init = 0; //消息未处理
    const handle_status_start = 1; //消息处理中
    const handle_status_end = 2; //处理完毕

    /**
     * 创建消息
     * @param string $topic 消息类型
     * @param string|array $message json
     * @param string $tag 便于筛选用的字符
     * @throws Exception
     */
    public static function push($topic, $message, $tag='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $msgCode = UniqueCode::getMessageCode();
        return self::link(self::table_message)->insert([
            'topic_name' => $topic,
            'msg_code' => $msgCode,
            'tag' => $tag,
            'message' => $message,
        ])->insertId;
    }

    //取出一条消息
    public static function pop($topic='', $status=[])
    {
        if (empty($status)) {
            $status = [self::status_init];
        }

        if (!empty($topic)) {
            return self::link(self::table_message)
                ->where(['topic_name' => $topic])
                ->whereIn('status', $status)
                ->order('id')
                ->select()
                ->getOne();
        } else {
            return self::link(self::table_message)
                ->whereIn('status', $status)
                ->order('id')
                ->select()
                ->getOne();
        }

    }

    //更新消息状态
    public static function updateMsgStatus($id, $status)
    {
        return self::link(self::table_message)
            ->where(['id' => $id])
            ->updateVal(['status' => $status])
            ->update();
    }

    //获取尚在监听的消费者信息
    public static function getConsumer($topic)
    {
        return self::link(self::table_consumer)
            ->fields('name,interface,topic_name')
            ->where(['topic_name' => $topic])
            ->where(['status' => self::consumer_status_yes])
            ->order('id')
            ->select()
            ->getAll();
    }

    //消费者收到消息, 记录下来
    public static function addHandleRecord($consumer, $msgCode, $topic)
    {
        return self::link(self::table_handle)->insert([
            'name' => $consumer,
            'msg_code' => $msgCode,
            'topic_name' => $topic,
        ])->insertId;
    }

    //更新消息处理状态
    public static function updateHandleStatus($id, $status)
    {
        return self::link(self::table_handle)
            ->where(['id' => $id])
            ->updateVal(['status' => $status])
            ->update();
    }

    //是否已消费过某个消息
    public static function hasHandled($consumer, $msgCode )
    {
        return self::link(self::table_handle)
            ->fields('id')
            ->where([
                'name' => $consumer,
                'msg_code' => $msgCode,])
            ->whereIn('status', [
                self::handle_status_init,
                self::handle_status_start,
                self::handle_status_end])
            ->select()
            ->getOne();
    }


    //分发消息, 由crontab定时任务调用
    public static function handle($message)
    {
        try {
            FileLog::info("开始处理消息: ". $message['id'], 'db_queue');

            $msgBody = json_decode($message['message'], true);
            $msgId = $message['id'];
            $msgCode = $message['msg_code'];
            $topic = $message['topic_name'];

            $msgBody['msg_code'] = $msgCode;
            $msgBody['topic_name'] = $topic;
            $msgBody['time'] = REQUEST_TIME;
            $msgBody['trace_id'] = FileLog::$uuid;
            $msgBody['app_id'] = 'queue_db';
            $msgBody['sign'] = Sign::getMd5Sign('queue_db', $msgBody);

            self::updateMsgStatus($msgId, self::status_notify_start); //开始通知

            $consumers = self::getConsumer($topic);
            FileLog::info($consumers, "$msgCode $topic 获取消费者: ", 'db_queue');
            foreach ($consumers as $con) {
                if (empty($con['interface'])) {
                    continue;
                }
                FileLog::info("$msgCode $topic 分发消息开始: ".$con['name'], 'db_queue');
                $rs = ICurl::ini($con['interface'])
                    ->ignoreSSL()
                    ->setConnectTimeOutMs(200)
                    ->setExecTimeOutMs(200)
                    ->setPostData(http_build_query($msgBody))
                    ->post();

                FileLog::info("$msgCode $topic 分发消息结束: ".$con['name'].' 返回结果: '.$rs, 'db_queue');
            }

            self::updateMsgStatus($msgId, self::status_notify_end); //通知结束
            return true;
        } catch (Exception $e) {
            self::updateMsgStatus($msgId, self::status_error);
            FileLog::info("$msgCode $topic 分发消息失败: ".$e->getMessage(), 'db_queue');
            return false;
        }

    }

}
